package com.migrate.module.service.impl;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.migrate.module.domain.BinLog;
import com.migrate.module.domain.EtlProgress;
import com.migrate.module.domain.EtlStatistical;
import com.migrate.module.domain.RangeScroll;
import com.migrate.module.enumeration.BinlogType;
import com.migrate.module.enumeration.DBChannel;
import com.migrate.module.mapper.migrate.MigrateScrollMapper;
import com.migrate.module.migrate.ScrollProcessor;
import com.migrate.module.service.MigrateService;
import com.migrate.module.util.DateUtils;
import com.migrate.module.util.MigrateUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.lang.reflect.Method;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;


/**
 * 数据同步服务实现类
 *
 * @author zhonghuashishan
 */
@Service
@Slf4j
public class MigrateServiceImpl implements MigrateService {

    @Resource
    private MigrateScrollMapper migrateScrollMapper;

    @Resource
    private ScrollProcessor scrollProcessor;

    @Override
    public boolean migrateBat(String tableName, List<BinLog> binLogs)
    {
        log.info("开始执行migrateBat方法，tableName=" + tableName+",本次操作"+binLogs.size()+"条记录");
        if (StrUtil.isNotBlank(tableName) && CollUtil.isNotEmpty(binLogs))
        {
            // 根据表名取得第二个数据源的Mapper
            Object mapper = MigrateUtil.getD2MapperByTableName(tableName);
            if (null != mapper)
            {
                try
                {
                    List <Map <String, Object>> insertMaps = new ArrayList<>();
                    for (BinLog binLog : binLogs)
                    {
                        Method targetMethod = null;
                        if (BinlogType.INSERT.getValue().equals(binLog.getOperateType()))
                        {
                            // 新增操作单独拎出来做批量新增，不然执行效率太低
                            insertMaps.add(binLog.getDataMap());
                        }
                        else if (BinlogType.UPDATE.getValue().equals(binLog.getOperateType()))
                        {
                            //处理一下更新的null异常对象
                            binLog.setDataMap(MigrateUtil.updateNullValue(binLog.getDataMap()));
                            // 通过反射获取修改方法
                            targetMethod = mapper.getClass().getDeclaredMethod("update", Map.class);
                        }
                        else if (BinlogType.DELETE.getValue().equals(binLog.getOperateType()))
                        {
                            // 通过反射获取删除方法
                            targetMethod = mapper.getClass().getDeclaredMethod("delete", Map.class);
                        }
                        if (null != targetMethod)
                        {
                            // 通过反射执行方法
                            targetMethod.invoke(mapper, binLog.getDataMap());
                        }
                    }
                    // 批量新增
                    if (CollUtil.isNotEmpty(insertMaps))
                    {
                        MigrateUtil.removeNullValue(insertMaps);
                        mapper.getClass().getDeclaredMethod("insertBat", List.class).invoke(mapper, insertMaps);
                    }
                }
                catch (Exception e)
                {
                    log.error("migrateBat () tableName=" + tableName, e);
                    return false;
                }
                return true;
            }
        }
        return false;
    }

    @Override
    @SuppressWarnings({"unchecked"})
    public List<Map<String, Object>> findByIdentifiers(String tableName, List<String> identifiers,String dbChannel)
    {
        if (StrUtil.isNotBlank(tableName) && CollUtil.isNotEmpty(identifiers))
        {
            try
            {
                Object mapper;
                if (DBChannel.CHANNEL_1.getValue().equals(dbChannel)){
                    mapper = MigrateUtil.getD1MapperByTableName(tableName);
                } else {
                    // 根据表名取得第二个数据源的Mapper
                    mapper = MigrateUtil.getD2MapperByTableName(tableName);
                }
                if (null != mapper)
                {
                    // 通过反射获取selectByIdentifiers方法
                    Method targetMethod = mapper.getClass().getDeclaredMethod("selectByIdentifiers", List.class);
                    // 通过反射执行selectByIdentifiers方法
                    Object returnValue = targetMethod.invoke(mapper, identifiers);
                    if (null != returnValue)
                    {
                        return MigrateUtil.toCamelCaseMapList((List<Map<String, Object>>)returnValue);
                    }
                    return new ArrayList<>();
                }
            }
            catch (Exception e)
            {
                log.error("findByIdentifiers方法执行出错", e);
                return new ArrayList<>();
            }
        }
        return new ArrayList<>();
    }

    @Override
    @SuppressWarnings({"unchecked"})
    public List<Map<String, Object>> queryInfoList(RangeScroll rangeScroll) {
        if (StrUtil.isNotBlank(rangeScroll.getTableName()) && StrUtil.isNotBlank(rangeScroll.getStartScrollId()))
        {
            try
            {
                Object mapper =  MigrateUtil.getD1MapperByTableName(rangeScroll.getTableName());

                if (null != mapper)
                {
                    // 通过反射获取queryInfoList方法
                    Method targetMethod = mapper.getClass().getDeclaredMethod("queryInfoList", RangeScroll.class);
                    // 通过反射执行queryInfoList方法
                    Object returnValue = targetMethod.invoke(mapper, rangeScroll);
                    if (null != returnValue)
                    {
                        return MigrateUtil.toCamelCaseMapList((List<Map<String, Object>>)returnValue);
                    }
                    return new ArrayList<>();
                }
            }
            catch (Exception e)
            {
                log.error("queryInfoList方法执行出错", e);
                return new ArrayList<>();
            }
        }
        return new ArrayList<>();
    }

    @Override
    public void compensateRangeScroll(Long id) {
        EtlProgress etlProgressInfo = migrateScrollMapper.queryEtlProgressById(id);
        RangeScroll rangeScroll = new RangeScroll();
        rangeScroll.setStartScrollId(etlProgressInfo.getScrollId());
        rangeScroll.setTableName(etlProgressInfo.getLogicModel());
        rangeScroll.setStartTime(etlProgressInfo.getScrollTime());
        //补偿再次发起
        scrollProcessor.scroll(rangeScroll);
    }

    @Override
    public List<EtlProgress> getEtlProgresses(EtlProgress queryCondition)
    {
        try
        {
            if (null == queryCondition)
            {
                // 防止传个null过来造成mybatis处理出错
                queryCondition = new EtlProgress();
            }
            List<EtlProgress> progressList = migrateScrollMapper.queryEtlProgressList(queryCondition);
            if (CollectionUtils.isNotEmpty(progressList)){
                for (EtlProgress etlProgress:progressList){
                    EtlStatistical etlStatistical = new EtlStatistical();
                    etlStatistical.setLogicModel(etlProgress.getLogicModel());
                    etlStatistical.setStartTime(DateUtils.format(etlProgress.getScrollTime()) );
                    etlStatistical.setEndTime(DateUtils.format(etlProgress.getScrollEndTime()));
                    // 获取已同步的数据（通过CountCacheTask分天统计计算的数据）
                    BigDecimal statisticalCount = migrateScrollMapper.getStatisticalCount(etlStatistical);
                    // 如果存在已经同步的数据数量，则计算同步进度，否则设置同步进度为0%
                    if (null != statisticalCount && null != etlProgress.getFinishRecord())
                    {
                        BigDecimal progressScale = new BigDecimal(etlProgress.getFinishRecord()).divide(statisticalCount,2,BigDecimal.ROUND_HALF_UP);
                        // 因为前端展示的进度条需要的是百分比的数字，所以这里把结果乘以100
                        etlProgress.setProgressScale(progressScale.multiply(new BigDecimal(100)));
                    }
                    else
                    {
                        etlProgress.setProgressScale(BigDecimal.ZERO);
                    }
                }
            }
            return progressList;
        } catch (Exception e){
            log.error("getEtlProgresses方法执行出错", e);
            return new ArrayList<>();
        }
    }

    @Override
    public String queryMinOrderNo(RangeScroll rangeScroll) {
        if (StrUtil.isNotBlank(rangeScroll.getTableName()))
        {
            try
            {
                Object mapper =  MigrateUtil.getD1MapperByTableName(rangeScroll.getTableName());

                if (null != mapper)
                {
                    // 通过反射获取queryInfoList方法
                    Method targetMethod = mapper.getClass().getDeclaredMethod("queryMinOrderNo", RangeScroll.class);
                    // 通过反射执行queryInfoList方法
                    Object returnValue = targetMethod.invoke(mapper, rangeScroll);
                    if (null != returnValue)
                    {
                        return String.valueOf(Long.parseLong((String)returnValue)-1);
                    }
                    return "0";
                }
            }
            catch (Exception e)
            {
                log.error("queryInfoList方法执行出错", e);
                return "0";
            }
        }
        return "0";
    }


    @Override
    public List <String> getScrollAbleTables()
    {
        return migrateScrollMapper.getScrollAbleTables();
    }

}
